# Copyright 2018-2023 Gentoo Authors
# Distributed under the terms of the GNU General Public License v2

import time

import functools
import pytest

from portage.tests import TestCase
from portage.util._async.ForkProcess import ForkProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.futures import asyncio
from portage.util.futures.iter_completed import (
    iter_completed,
    async_iter_completed,
)


class SleepProcess(ForkProcess):
    __slots__ = ("future", "seconds")

    def _start(self):
        self.target = functools.partial(time.sleep, self.seconds)
        self.addExitListener(self._future_done)
        ForkProcess._start(self)

    def _future_done(self, task):
        if not self.future.cancelled():
            self.future.set_result(self.seconds)


class IterCompletedTestCase(TestCase):
    # Mark this as todo, since we don't want to fail if heavy system load causes
    # the tasks to finish in an unexpected order.
    @pytest.mark.xfail(strict=False)
    def testIterCompleted(self):
        loop = global_event_loop()
        tasks = [
            SleepProcess(seconds=0.200),
            SleepProcess(seconds=0.100),
            SleepProcess(seconds=0.001),
        ]

        expected_order = sorted(task.seconds for task in tasks)

        def future_generator():
            for task in tasks:
                task.future = loop.create_future()
                task.scheduler = loop
                task.start()
                yield task.future

        for seconds, future in zip(
            expected_order,
            iter_completed(future_generator(), max_jobs=True, max_load=None, loop=loop),
        ):
            self.assertEqual(seconds, future.result())

    def testAsyncCancel(self):
        loop = global_event_loop()
        input_futures = set()
        future_count = 3

        def future_generator():
            for i in range(future_count):
                future = loop.create_future()
                loop.call_soon(
                    lambda future: None if future.done() else future.set_result(None),
                    future,
                )
                input_futures.add(future)
                yield future

        for future_done_set in async_iter_completed(
            future_generator(), max_jobs=True, max_load=True, loop=loop
        ):
            future_done_set.cancel()
            break

        # With max_jobs=True, async_iter_completed should have executed
        # the generator until it raised StopIteration.
        self.assertEqual(future_count, len(input_futures))

        loop.run_until_complete(asyncio.wait(input_futures, loop=loop))

        # The futures may have results or they may have been cancelled
        # by TaskScheduler, and behavior varies depending on the python
        # interpreter.
        for future in input_futures:
            future.cancelled() or future.result()
